1use std::{
6 collections::{BTreeSet, HashMap},
7 mem,
8 sync::Arc,
9};
10
11use indexmap::IndexMap;
12use zebra_chain::{
13 block::{self, Block, Hash, Height},
14 parameters::Network,
15 sprout::{self},
16 transparent,
17};
18
19use crate::{
20 constants::{MAX_INVALIDATED_BLOCKS, MAX_NON_FINALIZED_CHAIN_FORKS},
21 error::ReconsiderError,
22 request::{ContextuallyVerifiedBlock, FinalizableBlock},
23 service::{check, finalized_state::ZebraDb},
24 SemanticallyVerifiedBlock, ValidateContextError,
25};
26
27mod chain;
28
29#[cfg(test)]
30mod tests;
31
32pub(crate) use chain::{Chain, SpendingTransactionId};
33
34pub struct NonFinalizedState {
42 chain_set: BTreeSet<Arc<Chain>>,
50
51 invalidated_blocks: IndexMap<Height, Arc<Vec<ContextuallyVerifiedBlock>>>,
54
55 pub network: Network,
59
60 should_count_metrics: bool,
69
70 #[cfg(feature = "progress-bar")]
72 chain_count_bar: Option<howudoin::Tx>,
73
74 #[cfg(feature = "progress-bar")]
79 chain_fork_length_bars: Vec<howudoin::Tx>,
80}
81
82impl std::fmt::Debug for NonFinalizedState {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 let mut f = f.debug_struct("NonFinalizedState");
85
86 f.field("chain_set", &self.chain_set)
87 .field("network", &self.network);
88
89 f.field("should_count_metrics", &self.should_count_metrics);
90
91 f.finish()
92 }
93}
94
95impl Clone for NonFinalizedState {
96 fn clone(&self) -> Self {
97 Self {
98 chain_set: self.chain_set.clone(),
99 network: self.network.clone(),
100 invalidated_blocks: self.invalidated_blocks.clone(),
101 should_count_metrics: self.should_count_metrics,
102 #[cfg(feature = "progress-bar")]
104 chain_count_bar: None,
105 #[cfg(feature = "progress-bar")]
106 chain_fork_length_bars: Vec::new(),
107 }
108 }
109}
110
111impl NonFinalizedState {
112 pub fn new(network: &Network) -> NonFinalizedState {
114 NonFinalizedState {
115 chain_set: Default::default(),
116 network: network.clone(),
117 invalidated_blocks: Default::default(),
118 should_count_metrics: true,
119 #[cfg(feature = "progress-bar")]
120 chain_count_bar: None,
121 #[cfg(feature = "progress-bar")]
122 chain_fork_length_bars: Vec::new(),
123 }
124 }
125
126 #[cfg(any(test, feature = "proptest-impl"))]
137 #[allow(dead_code)]
138 pub fn eq_internal_state(&self, other: &NonFinalizedState) -> bool {
139 self.chain_set.len() == other.chain_set.len()
143 && self
144 .chain_set
145 .iter()
146 .zip(other.chain_set.iter())
147 .all(|(self_chain, other_chain)| self_chain.eq_internal_state(other_chain))
148 && self.network == other.network
149 }
150
151 pub fn chain_iter(&self) -> impl Iterator<Item = &Arc<Chain>> {
155 self.chain_set.iter().rev()
156 }
157
158 fn insert_with<F>(&mut self, chain: Arc<Chain>, chain_filter: F)
161 where
162 F: FnOnce(&mut BTreeSet<Arc<Chain>>),
163 {
164 self.chain_set.insert(chain);
165
166 chain_filter(&mut self.chain_set);
167
168 while self.chain_set.len() > MAX_NON_FINALIZED_CHAIN_FORKS {
169 self.chain_set.pop_first();
171 }
172
173 self.update_metrics_bars();
174 }
175
176 fn insert(&mut self, chain: Arc<Chain>) {
178 self.insert_with(chain, |_ignored_chain| { })
179 }
180
181 pub fn finalize(&mut self) -> FinalizableBlock {
184 #[allow(clippy::mutable_key_type)]
188 let chains = mem::take(&mut self.chain_set);
189 let mut chains = chains.into_iter();
190
191 let mut best_chain = chains.next_back().expect("there's at least one chain");
193
194 let mut_best_chain = Arc::make_mut(&mut best_chain);
196
197 let side_chains = chains;
199
200 let (best_chain_root, root_treestate) = mut_best_chain.pop_root();
203
204 if !best_chain.is_empty() {
206 self.insert(best_chain);
207 }
208
209 for mut side_chain in side_chains.rev() {
211 if side_chain.non_finalized_root_hash() != best_chain_root.hash {
212 drop(side_chain);
215
216 continue;
217 }
218
219 let mut_side_chain = Arc::make_mut(&mut side_chain);
223
224 let (side_chain_root, _treestate) = mut_side_chain.pop_root();
226 assert_eq!(side_chain_root.hash, best_chain_root.hash);
227
228 self.insert(side_chain);
230 }
231
232 self.invalidated_blocks
234 .retain(|height, _blocks| *height >= best_chain_root.height);
235
236 self.update_metrics_for_chains();
237
238 FinalizableBlock::new(best_chain_root, root_treestate)
240 }
241
242 #[tracing::instrument(level = "debug", skip(self, finalized_state, prepared))]
246 pub fn commit_block(
247 &mut self,
248 prepared: SemanticallyVerifiedBlock,
249 finalized_state: &ZebraDb,
250 ) -> Result<(), ValidateContextError> {
251 let parent_hash = prepared.block.header.previous_block_hash;
252 let (height, hash) = (prepared.height, prepared.hash);
253
254 let parent_chain = self.parent_chain(parent_hash)?;
255
256 let modified_chain = self.validate_and_commit(parent_chain, prepared, finalized_state)?;
259
260 self.insert_with(modified_chain, |chain_set| {
265 chain_set.retain(|chain| chain.non_finalized_tip_hash() != parent_hash)
266 });
267
268 self.update_metrics_for_committed_block(height, hash);
269
270 Ok(())
271 }
272
273 pub fn invalidate_block(&mut self, block_hash: Hash) {
276 let Some(chain) = self.find_chain(|chain| chain.contains_block_hash(block_hash)) else {
277 return;
278 };
279
280 let invalidated_blocks = if chain.non_finalized_root_hash() == block_hash {
281 self.chain_set.remove(&chain);
282 chain.blocks.values().cloned().collect()
283 } else {
284 let (new_chain, invalidated_blocks) = chain
285 .invalidate_block(block_hash)
286 .expect("already checked that chain contains hash");
287
288 self.insert_with(Arc::new(new_chain.clone()), |chain_set| {
291 chain_set.retain(|c| !c.contains_block_hash(block_hash))
292 });
293
294 invalidated_blocks
295 };
296
297 self.invalidated_blocks.insert(
298 invalidated_blocks.first().unwrap().clone().height,
299 Arc::new(invalidated_blocks),
300 );
301
302 while self.invalidated_blocks.len() > MAX_INVALIDATED_BLOCKS {
303 self.invalidated_blocks.shift_remove_index(0);
304 }
305
306 self.update_metrics_for_chains();
307 self.update_metrics_bars();
308 }
309
310 pub fn reconsider_block(
314 &mut self,
315 block_hash: block::Hash,
316 finalized_state: &ZebraDb,
317 ) -> Result<(), ReconsiderError> {
318 let height = self
320 .invalidated_blocks
321 .iter()
322 .find_map(|(height, blocks)| {
323 if blocks.first()?.hash == block_hash {
324 Some(height)
325 } else {
326 None
327 }
328 })
329 .ok_or(ReconsiderError::MissingInvalidatedBlock(block_hash))?;
330
331 let mut invalidated_blocks = self
332 .invalidated_blocks
333 .clone()
334 .shift_remove(height)
335 .ok_or(ReconsiderError::MissingInvalidatedBlock(block_hash))?;
336 let mut_blocks = Arc::make_mut(&mut invalidated_blocks);
337
338 let invalidated_root = mut_blocks
341 .first()
342 .ok_or(ReconsiderError::InvalidatedBlocksEmpty)?;
343
344 let root_parent_hash = invalidated_root.block.header.previous_block_hash;
345
346 let chain_result = if root_parent_hash == finalized_state.finalized_tip_hash() {
349 let chain = Chain::new(
350 &self.network,
351 finalized_state
352 .finalized_tip_height()
353 .ok_or(ReconsiderError::ParentChainNotFound(block_hash))?,
354 finalized_state.sprout_tree_for_tip(),
355 finalized_state.sapling_tree_for_tip(),
356 finalized_state.orchard_tree_for_tip(),
357 finalized_state.history_tree(),
358 finalized_state.finalized_value_pool(),
359 );
360 Arc::new(chain)
361 } else {
362 self.parent_chain(root_parent_hash)
365 .map_err(|_| ReconsiderError::ParentChainNotFound(block_hash))?
366 };
367
368 let mut modified_chain = Arc::unwrap_or_clone(chain_result);
369 for block in Arc::unwrap_or_clone(invalidated_blocks) {
370 modified_chain = modified_chain.push(block)?;
371 }
372
373 let (height, hash) = modified_chain.non_finalized_tip();
374
375 if let Some(best_chain_root_height) = finalized_state.finalized_tip_height() {
378 self.invalidated_blocks
379 .retain(|height, _blocks| *height >= best_chain_root_height);
380 }
381
382 self.insert_with(Arc::new(modified_chain), |chain_set| {
383 chain_set.retain(|chain| chain.non_finalized_tip_hash() != root_parent_hash)
384 });
385
386 self.update_metrics_for_committed_block(height, hash);
387
388 Ok(())
389 }
390
391 #[tracing::instrument(level = "debug", skip(self, finalized_state, prepared))]
394 #[allow(clippy::unwrap_in_result)]
395 pub fn commit_new_chain(
396 &mut self,
397 prepared: SemanticallyVerifiedBlock,
398 finalized_state: &ZebraDb,
399 ) -> Result<(), ValidateContextError> {
400 let finalized_tip_height = finalized_state.finalized_tip_height();
401
402 #[cfg(not(test))]
404 let finalized_tip_height = finalized_tip_height.expect("finalized state contains blocks");
405 #[cfg(test)]
406 let finalized_tip_height = finalized_tip_height.unwrap_or(zebra_chain::block::Height(0));
407
408 let chain = Chain::new(
409 &self.network,
410 finalized_tip_height,
411 finalized_state.sprout_tree_for_tip(),
412 finalized_state.sapling_tree_for_tip(),
413 finalized_state.orchard_tree_for_tip(),
414 finalized_state.history_tree(),
415 finalized_state.finalized_value_pool(),
416 );
417
418 let (height, hash) = (prepared.height, prepared.hash);
419
420 let chain = self.validate_and_commit(Arc::new(chain), prepared, finalized_state)?;
422
423 self.insert(chain);
425 self.update_metrics_for_committed_block(height, hash);
426
427 Ok(())
428 }
429
430 #[tracing::instrument(level = "debug", skip(self, finalized_state, new_chain))]
436 fn validate_and_commit(
437 &self,
438 new_chain: Arc<Chain>,
439 prepared: SemanticallyVerifiedBlock,
440 finalized_state: &ZebraDb,
441 ) -> Result<Arc<Chain>, ValidateContextError> {
442 if self.invalidated_blocks.contains_key(&prepared.height) {
443 return Err(ValidateContextError::BlockPreviouslyInvalidated {
444 block_hash: prepared.hash,
445 });
446 }
447
448 let spent_utxos = check::utxo::transparent_spend(
452 &prepared,
453 &new_chain.unspent_utxos(),
454 &new_chain.spent_utxos,
455 finalized_state,
456 )?;
457
458 check::anchors::block_sapling_orchard_anchors_refer_to_final_treestates(
460 finalized_state,
461 &new_chain,
462 &prepared,
463 )?;
464
465 let sprout_final_treestates = check::anchors::block_fetch_sprout_final_treestates(
467 finalized_state,
468 &new_chain,
469 &prepared,
470 );
471
472 let contextual = ContextuallyVerifiedBlock::with_block_and_spent_utxos(
474 prepared.clone(),
475 spent_utxos.clone(),
476 )
477 .map_err(|value_balance_error| {
478 ValidateContextError::CalculateBlockChainValueChange {
479 value_balance_error,
480 height: prepared.height,
481 block_hash: prepared.hash,
482 transaction_count: prepared.block.transactions.len(),
483 spent_utxo_count: spent_utxos.len(),
484 }
485 })?;
486
487 Self::validate_and_update_parallel(new_chain, contextual, sprout_final_treestates)
488 }
489
490 #[allow(clippy::unwrap_in_result)]
492 #[tracing::instrument(skip(new_chain, sprout_final_treestates))]
493 fn validate_and_update_parallel(
494 new_chain: Arc<Chain>,
495 contextual: ContextuallyVerifiedBlock,
496 sprout_final_treestates: HashMap<sprout::tree::Root, Arc<sprout::tree::NoteCommitmentTree>>,
497 ) -> Result<Arc<Chain>, ValidateContextError> {
498 let mut block_commitment_result = None;
499 let mut sprout_anchor_result = None;
500 let mut chain_push_result = None;
501
502 let block = contextual.block.clone();
504 let network = new_chain.network();
505 let history_tree = new_chain.history_block_commitment_tree();
506
507 let block2 = contextual.block.clone();
508 let height = contextual.height;
509 let transaction_hashes = contextual.transaction_hashes.clone();
510
511 rayon::in_place_scope_fifo(|scope| {
512 scope.spawn_fifo(|_scope| {
513 block_commitment_result = Some(check::block_commitment_is_valid_for_chain_history(
514 block,
515 &network,
516 &history_tree,
517 ));
518 });
519
520 scope.spawn_fifo(|_scope| {
521 sprout_anchor_result =
522 Some(check::anchors::block_sprout_anchors_refer_to_treestates(
523 sprout_final_treestates,
524 block2,
525 transaction_hashes,
526 height,
527 ));
528 });
529
530 scope.spawn_fifo(|_scope| {
536 let new_chain = Arc::try_unwrap(new_chain)
539 .unwrap_or_else(|shared_chain| (*shared_chain).clone());
540 chain_push_result = Some(new_chain.push(contextual).map(Arc::new));
541 });
542 });
543
544 block_commitment_result.expect("scope has finished")?;
546 sprout_anchor_result.expect("scope has finished")?;
547
548 chain_push_result.expect("scope has finished")
549 }
550
551 pub fn best_chain_len(&self) -> Option<u32> {
554 Some(self.best_chain()?.blocks.len() as u32)
557 }
558
559 #[allow(dead_code)]
562 pub fn any_chain_contains(&self, hash: &block::Hash) -> bool {
563 self.chain_set
564 .iter()
565 .rev()
566 .any(|chain| chain.height_by_hash.contains_key(hash))
567 }
568
569 pub fn find_chain<P>(&self, mut predicate: P) -> Option<Arc<Chain>>
574 where
575 P: FnMut(&Chain) -> bool,
576 {
577 self.chain_set
579 .iter()
580 .rev()
581 .find(|chain| predicate(chain))
582 .cloned()
583 }
584
585 pub fn any_utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
590 self.chain_set
591 .iter()
592 .rev()
593 .find_map(|chain| chain.created_utxo(outpoint))
594 }
595
596 #[allow(dead_code)]
598 pub fn any_block_by_hash(&self, hash: block::Hash) -> Option<Arc<Block>> {
599 for chain in self.chain_set.iter().rev() {
601 if let Some(prepared) = chain
602 .height_by_hash
603 .get(&hash)
604 .and_then(|height| chain.blocks.get(height))
605 {
606 return Some(prepared.block.clone());
607 }
608 }
609
610 None
611 }
612
613 #[allow(dead_code)]
615 pub fn any_prev_block_hash_for_hash(&self, hash: block::Hash) -> Option<block::Hash> {
616 self.any_block_by_hash(hash)
618 .map(|block| block.header.previous_block_hash)
619 }
620
621 #[allow(dead_code)]
623 pub fn best_hash(&self, height: block::Height) -> Option<block::Hash> {
624 self.best_chain()?
625 .blocks
626 .get(&height)
627 .map(|prepared| prepared.hash)
628 }
629
630 #[allow(dead_code)]
632 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
633 let best_chain = self.best_chain()?;
634 let height = best_chain.non_finalized_tip_height();
635 let hash = best_chain.non_finalized_tip_hash();
636
637 Some((height, hash))
638 }
639
640 #[allow(dead_code)]
642 pub fn best_tip_block(&self) -> Option<&ContextuallyVerifiedBlock> {
643 let best_chain = self.best_chain()?;
644
645 best_chain.tip_block()
646 }
647
648 #[allow(dead_code)]
650 pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
651 let best_chain = self.best_chain()?;
652 let height = *best_chain.height_by_hash.get(&hash)?;
653 Some(height)
654 }
655
656 #[allow(dead_code)]
658 pub fn any_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
659 for chain in self.chain_set.iter().rev() {
660 if let Some(height) = chain.height_by_hash.get(&hash) {
661 return Some(*height);
662 }
663 }
664
665 None
666 }
667
668 #[cfg(any(test, feature = "proptest-impl"))]
670 #[allow(dead_code)]
671 pub fn best_contains_sprout_nullifier(&self, sprout_nullifier: &sprout::Nullifier) -> bool {
672 self.best_chain()
673 .map(|best_chain| best_chain.sprout_nullifiers.contains_key(sprout_nullifier))
674 .unwrap_or(false)
675 }
676
677 #[cfg(any(test, feature = "proptest-impl"))]
679 #[allow(dead_code)]
680 pub fn best_contains_sapling_nullifier(
681 &self,
682 sapling_nullifier: &zebra_chain::sapling::Nullifier,
683 ) -> bool {
684 self.best_chain()
685 .map(|best_chain| {
686 best_chain
687 .sapling_nullifiers
688 .contains_key(sapling_nullifier)
689 })
690 .unwrap_or(false)
691 }
692
693 #[cfg(any(test, feature = "proptest-impl"))]
695 #[allow(dead_code)]
696 pub fn best_contains_orchard_nullifier(
697 &self,
698 orchard_nullifier: &zebra_chain::orchard::Nullifier,
699 ) -> bool {
700 self.best_chain()
701 .map(|best_chain| {
702 best_chain
703 .orchard_nullifiers
704 .contains_key(orchard_nullifier)
705 })
706 .unwrap_or(false)
707 }
708
709 pub fn best_chain(&self) -> Option<&Arc<Chain>> {
711 self.chain_iter().next()
712 }
713
714 pub fn chain_count(&self) -> usize {
716 self.chain_set.len()
717 }
718
719 pub fn invalidated_blocks(&self) -> IndexMap<Height, Arc<Vec<ContextuallyVerifiedBlock>>> {
721 self.invalidated_blocks.clone()
722 }
723
724 fn parent_chain(&self, parent_hash: block::Hash) -> Result<Arc<Chain>, ValidateContextError> {
729 match self.find_chain(|chain| chain.non_finalized_tip_hash() == parent_hash) {
730 Some(chain) => Ok(chain.clone()),
732 None => {
734 let fork_chain = self
737 .chain_set
738 .iter()
739 .rev()
740 .find_map(|chain| chain.fork(parent_hash))
741 .ok_or(ValidateContextError::NotReadyToBeCommitted)?;
742
743 Ok(Arc::new(fork_chain))
744 }
745 }
746 }
747
748 fn should_count_metrics(&self) -> bool {
750 self.should_count_metrics
751 }
752
753 fn update_metrics_for_committed_block(&self, height: block::Height, hash: block::Hash) {
755 if !self.should_count_metrics() {
756 return;
757 }
758
759 metrics::counter!("state.memory.committed.block.count").increment(1);
760 metrics::gauge!("state.memory.committed.block.height").set(height.0 as f64);
761
762 if self
763 .best_chain()
764 .expect("metrics are only updated after initialization")
765 .non_finalized_tip_hash()
766 == hash
767 {
768 metrics::counter!("state.memory.best.committed.block.count").increment(1);
769 metrics::gauge!("state.memory.best.committed.block.height").set(height.0 as f64);
770 }
771
772 self.update_metrics_for_chains();
773 }
774
775 fn update_metrics_for_chains(&self) {
777 if !self.should_count_metrics() {
778 return;
779 }
780
781 metrics::gauge!("state.memory.chain.count").set(self.chain_set.len() as f64);
782 metrics::gauge!("state.memory.best.chain.length",)
783 .set(self.best_chain_len().unwrap_or_default() as f64);
784 }
785
786 fn update_metrics_bars(&mut self) {
789 if !self.should_count_metrics() {
792 #[allow(clippy::needless_return)]
793 return;
794 }
795
796 #[cfg(feature = "progress-bar")]
797 {
798 use std::cmp::Ordering::*;
799
800 if matches!(howudoin::cancelled(), Some(true)) {
801 self.disable_metrics();
802 return;
803 }
804
805 if self.chain_count_bar.is_none() {
807 self.chain_count_bar = Some(howudoin::new_root().label("Chain Forks"));
808 }
809
810 let chain_count_bar = self
811 .chain_count_bar
812 .as_ref()
813 .expect("just initialized if missing");
814 let finalized_tip_height = self
815 .best_chain()
816 .map(|chain| chain.non_finalized_root_height().0 - 1);
817
818 chain_count_bar.set_pos(u64::try_from(self.chain_count()).expect("fits in u64"));
819 if let Some(finalized_tip_height) = finalized_tip_height {
822 chain_count_bar.desc(format!("Finalized Root {finalized_tip_height}"));
823 }
824
825 let prev_length_bars = self.chain_fork_length_bars.len();
827
828 match self.chain_count().cmp(&prev_length_bars) {
829 Greater => self
830 .chain_fork_length_bars
831 .resize_with(self.chain_count(), || {
832 howudoin::new_with_parent(chain_count_bar.id())
833 }),
834 Less => {
835 let redundant_bars = self.chain_fork_length_bars.split_off(self.chain_count());
836 for bar in redundant_bars {
837 bar.close();
838 }
839 }
840 Equal => {}
841 }
842
843 for (chain_length_bar, chain) in
846 std::iter::zip(self.chain_fork_length_bars.iter(), self.chain_iter())
847 {
848 let fork_height = chain
849 .last_fork_height
850 .unwrap_or_else(|| chain.non_finalized_tip_height())
851 .0;
852
853 chain_length_bar
857 .label(format!("Fork {fork_height}"))
858 .set_pos(u64::try_from(chain.len()).expect("fits in u64"));
859 let mut desc = String::new();
869
870 if let Some(recent_fork_height) = chain.recent_fork_height() {
871 let recent_fork_length = chain
872 .recent_fork_length()
873 .expect("just checked recent fork height");
874
875 let mut plural = "s";
876 if recent_fork_length == 1 {
877 plural = "";
878 }
879
880 desc.push_str(&format!(
881 " at {recent_fork_height:?} + {recent_fork_length} block{plural}"
882 ));
883 }
884
885 chain_length_bar.desc(desc);
886 }
887 }
888 }
889
890 pub fn disable_metrics(&mut self) {
892 self.should_count_metrics = false;
893
894 #[cfg(feature = "progress-bar")]
895 {
896 let count_bar = self.chain_count_bar.take().into_iter();
897 let fork_bars = self.chain_fork_length_bars.drain(..);
898 count_bar.chain(fork_bars).for_each(howudoin::Tx::close);
899 }
900 }
901}
902
903impl Drop for NonFinalizedState {
904 fn drop(&mut self) {
905 self.disable_metrics();
906 }
907}